Do not run/enqueue event propagation when a AgentPropagateJob is already enqueued

Fixes the creation of duplicated events caused by Agent.receive! running multiple times concurrently.

Dominik Sander 8 years ago
parent
commit
73bcd927e3

+ 8 - 4
app/controllers/agents_controller.rb

@@ -113,11 +113,15 @@ class AgentsController < ApplicationController
113 113
   end
114 114
 
115 115
   def propagate
116
-    details = Agent.receive! # Eventually this should probably be scoped to the current_user.
117
-
118 116
     respond_to do |format|
119
-      format.html { redirect_back "Queued propagation calls for #{details[:event_count]} event(s) on #{details[:agent_count]} agent(s)" }
120
-      format.json { head :ok }
117
+      if AgentPropagateJob.can_enqueue?
118
+        details = Agent.receive! # Eventually this should probably be scoped to the current_user.
119
+        format.html { redirect_back "Queued propagation calls for #{details[:event_count]} event(s) on #{details[:agent_count]} agent(s)" }
120
+        format.json { head :ok }
121
+      else
122
+        format.html { redirect_back "Event propagation is already scheduled to run." }
123
+        format.json { head :locked }
124
+      end
121 125
     end
122 126
   end
123 127
 

+ 13 - 1
app/jobs/agent_propagate_job.rb

@@ -1,7 +1,19 @@
1 1
 class AgentPropagateJob < ActiveJob::Base
2
-  queue_as :default
2
+  queue_as :propagation
3 3
 
4 4
   def perform
5 5
     Agent.receive!
6 6
   end
7
+
8
+  def self.can_enqueue?
9
+    if Rails.configuration.active_job.queue_adapter == :delayed_job &&
10
+       Delayed::Job.where(failed_at: nil, queue: 'propagation').count > 0
11
+      return false
12
+    elsif Rails.configuration.active_job.queue_adapter == :resque &&
13
+          (Resque.size('propagation') > 0 ||
14
+           Resque.workers.select { |w| w.job && w.job['queue'] && w.job['queue']['propagation'] }.count > 0)
15
+      return false
16
+    end
17
+    true
18
+  end
7 19
 end

+ 1 - 0
lib/huginn_scheduler.rb

@@ -155,6 +155,7 @@ class HuginnScheduler < LongRunnable::Worker
155 155
 
156 156
   def propagate!
157 157
     with_mutex do
158
+      return unless AgentPropagateJob.can_enqueue?
158 159
       puts "Queuing event propagation"
159 160
       AgentPropagateJob.perform_later
160 161
     end

+ 10 - 1
spec/controllers/agents_controller_spec.rb

@@ -68,11 +68,20 @@ describe AgentsController do
68 68
   end
69 69
 
70 70
   describe "POST propagate" do
71
-    it "runs event propagation for all Agents" do
71
+    before(:each) do
72 72
       sign_in users(:bob)
73
+    end
74
+
75
+    it "runs event propagation for all Agents" do
73 76
       mock.proxy(Agent).receive!
74 77
       post :propagate
75 78
     end
79
+
80
+    it "does not run the propagation when a job is already enqueued" do
81
+      mock(AgentPropagateJob).can_enqueue? { false }
82
+      post :propagate
83
+      expect(flash[:notice]).to eq('Event propagation is already scheduled to run.')
84
+    end
76 85
   end
77 86
 
78 87
   describe "GET show" do

+ 24 - 0
spec/jobs/agent_propagate_job_spec.rb

@@ -0,0 +1,24 @@
1
+require 'rails_helper'
2
+
3
+describe AgentPropagateJob do
4
+  it "calls Agent.receive! when run" do
5
+    mock(Agent).receive!
6
+    AgentPropagateJob.new.perform
7
+  end
8
+
9
+  context "#can_enqueue?" do
10
+    it "is truthy when no propagation job is queued" do
11
+      expect(AgentPropagateJob.can_enqueue?).to be_truthy
12
+    end
13
+
14
+    it "is falsy when a progation job is queued" do
15
+      Delayed::Job.create!(queue: 'propagation')
16
+      expect(AgentPropagateJob.can_enqueue?).to be_falsy
17
+    end
18
+
19
+    it "is truthy when a enqueued progation job failed" do
20
+      Delayed::Job.create!(queue: 'propagation', failed_at: Time.now - 1.minute)
21
+      expect(AgentPropagateJob.can_enqueue?).to be_truthy
22
+    end
23
+  end
24
+end